Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[redis-rs][core] Move connection refresh to the background #2915

Open
wants to merge 19 commits into
base: main
Choose a base branch
from

Conversation

GilboaAWS
Copy link
Collaborator

Issue link

This Pull Request is linked to issue (URL): [#2910]

Checklist

Before submitting the PR make sure the following are checked:

  • This Pull Request is related to one issue.
  • Commit message has a detailed description of what changed and why.
  • Tests are added or updated.
  • CHANGELOG.md and documentation files are updated.
  • Destination branch is correct - main or release
  • Create merge commit if merging release branch into main, squash otherwise.

@GilboaAWS GilboaAWS requested a review from a team as a code owner January 5, 2025 09:55
@GilboaAWS GilboaAWS requested review from barshaul and ikolomi January 5, 2025 10:04
Copy link
Collaborator

@ikolomi ikolomi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a substantial change in the core mechanics. It must include tests that fail w/o this change to prove it necessity

@nihohit
Copy link
Contributor

nihohit commented Jan 6, 2025

instead of splitting your state between multiple maps and leaving room for error in reading the wrong map, consider just use an enum representing each connection state (Usable, Reconnecting, etc.) in the main connection map.

See

for an example on how to do this.

Copy link
Collaborator

@barshaul barshaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First comments (stopped before fn update_refreshed_connection).
Please see all inline comments and see how it behaves with cases where refresh connections is being called only with management/user/both. Also, maybe I haven't got to it yet - but the refresh task should be bounded to the lifetime of the clusterNode / address, so it would be cancelled when refresh_slots removes it from the topology

glide-core/redis-rs/redis/src/cluster_async/mod.rs Outdated Show resolved Hide resolved
glide-core/redis-rs/redis/src/cluster_async/mod.rs Outdated Show resolved Hide resolved
glide-core/redis-rs/redis/src/cluster_async/mod.rs Outdated Show resolved Hide resolved
@@ -1798,9 +1858,8 @@ where
if !failed_connections.is_empty() {
Self::refresh_connections(
inner,
failed_connections,
failed_connections.into_iter().collect::<HashSet<String>>(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it creates an unnecessary copy. instead you can change failed_connections to be a set to begin with

@GilboaAWS GilboaAWS force-pushed the reconnect_to_bg branch 2 times, most recently from 40744cb to 4e6535f Compare January 7, 2025 09:54
@GilboaAWS GilboaAWS marked this pull request as draft January 7, 2025 11:48
@GilboaAWS GilboaAWS self-assigned this Jan 7, 2025
@GilboaAWS GilboaAWS force-pushed the reconnect_to_bg branch 11 times, most recently from a8588c1 to f51b647 Compare February 9, 2025 10:09
@GilboaAWS GilboaAWS marked this pull request as ready for review February 9, 2025 10:48
This reverts commit 4e6535f.

Signed-off-by: GilboaAWS <[email protected]>
…eturned the refresh_connection logic of sending the connection but without removing it from the connection_map

Signed-off-by: GilboaAWS <[email protected]>
Signed-off-by: GilboaAWS <[email protected]>
…tion before redirecting to a random node

Signed-off-by: GilboaAWS <[email protected]>
Signed-off-by: GilboaAWS <[email protected]>
Signed-off-by: GilboaAWS <[email protected]>
Signed-off-by: GilboaAWS <[email protected]>
…ot needed in rust where tokio runs the synced poll_flush to the end and only then let the refresh task to update the connection map

Signed-off-by: GilboaAWS <[email protected]>
Copy link
Collaborator

@barshaul barshaul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments inline

trace!("refresh_and_update_connections: calling trigger_refresh_connection_tasks");
Self::trigger_refresh_connection_tasks(
inner.clone(),
addresses.clone(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you clone it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the rust compiler demands it.

.read()
.expect(MUTEX_READ_ERR)
.refresh_conn_state
.collect_refresh_notifiers(&addresses);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't trigger_refresh_connection_tasks return the notifiers?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then you won't need to clone the addresses

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cost of cloning the addresses isn't a problem, especially the fact it happens in the slow path.
I think it's more clear to collect them with a dedicated function where needed, as we can see it's being used only here.

@@ -1371,62 +1374,147 @@ where
}
}

async fn refresh_connections(
// Creates refresh tasks, await on the tasks' notifier and the update the connection_container.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"...and the update the connection_container."
It doesn't update the connection container. the description of this function isn't clear - please fix it

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're 100% right.

Comment on lines 1403 to 1407
let futures: Vec<_> = refresh_task_notifiers
.iter()
.map(|notify| notify.notified())
.collect();
futures::future::join_all(futures).await;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can skip the memory allocation you get from 'collect' with:
join_all(refresh_task_notifiers.iter().map(|notify| notify.notified())).await;

.refresh_address_in_progress
.contains_key(&address)
{
info!("Skipping refresh for {}: already in progress", address);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be changed to debug! as we can get a lot of this log when a node will be down and we'll have reconnection retries

RefreshConnectionType::AllConnections,
None,
core.glide_connection_options.clone(),
false,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why false?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because when sending RefreshConnectionType::AllConnections, there is no need to to reuse old connection...

}
};

let mut conn_option = None;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let conn_option = if let Some(refresh_notifier) = reconnect_notifier {
...
} else {
None
}; 

None,
ConnectionCheck::OnlyAddress(address) => {
// No connection for this address in the conn_map
Self::trigger_refresh_connection_tasks(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again - can trigger_refresh_connection_tasks simply return the notifiers?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then it would reduce a lot of logic (lines 2423 and above)

.get_node()
.await;

let reconnect_notifier: Option<Arc<Notify>> = match core
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: reduce redundant code lines:

                let reconnect_notifier = core
                    .conn_lock
                    .read()
                    .expect(MUTEX_READ_ERR)
                    .refresh_conn_state
                    .refresh_address_in_progress
                    .get(&address).map(|refresh_task_state|  {
                        if let RefreshTaskStatus::Reconnecting(refresh_notifier) = &refresh_task_state.status { 
                            Some(refresh_notifier.get_notifier()) 
                        } else {
                            None
                        }});

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore if you change trigger_refresh_connection_tasks to return the notifiers

@@ -66,6 +66,7 @@ async def test_update_connection_password(
assert value == b"test_value"
await glide_client.update_connection_password(None)
await kill_connections(management_client)
await asyncio.sleep(1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to make sure the connections were already established.

Signed-off-by: GilboaAWS <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants